/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#define USING_LOG_PREFIX SERVER

#include "observer/ob_server_schema_updater.h"
#include "lib/thread/thread_mgr.h"
#include "share/schema/ob_multi_version_schema_service.h"
#include "share/inner_table/ob_inner_table_schema.h"
#include "observer/ob_server.h"

using namespace oceanbase::common;
using namespace oceanbase::share;
using namespace oceanbase::share::schema;

namespace oceanbase {
namespace observer {
ObServerSchemaTask::ObServerSchemaTask() : type_(INVALID), did_retry_(false), schema_info_()
{}

ObServerSchemaTask::ObServerSchemaTask(TYPE type, bool did_retry) : type_(type), did_retry_(did_retry), schema_info_()
{}

ObServerSchemaTask::ObServerSchemaTask(TYPE type, bool did_retry, const ObRefreshSchemaInfo& schema_info)
    : type_(type), did_retry_(did_retry), schema_info_(schema_info)
{}

ObServerSchemaTask::ObServerSchemaTask(TYPE type) : type_(type), did_retry_(false), schema_info_()
{}

ObServerSchemaTask::ObServerSchemaTask(TYPE type, const uint64_t tenant_id, const int64_t schema_version)
    : type_(type), did_retry_(false), schema_info_()
{
  schema_info_.set_tenant_id(tenant_id);
  schema_info_.set_schema_version(schema_version);
}

bool ObServerSchemaTask::need_process_alone() const
{
  return REFRESH == type_ || RELEASE == type_;
}

bool ObServerSchemaTask::is_valid() const
{
  return INVALID != type_;
}

int64_t ObServerSchemaTask::hash() const
{
  uint64_t hash_val = 0;
  hash_val = murmurhash(&type_, sizeof(type_), hash_val);
  if (ASYNC_REFRESH == type_) {
    const uint64_t tenant_id = get_tenant_id();
    const int64_t schema_version = get_schema_version();
    hash_val = murmurhash(&tenant_id, sizeof(tenant_id), hash_val);
    hash_val = murmurhash(&schema_version, sizeof(schema_version), hash_val);
  }
  return static_cast<int64_t>(hash_val);
}

bool ObServerSchemaTask::operator==(const ObServerSchemaTask& other) const
{
  bool bret = (type_ == other.type_);
  if (bret && ASYNC_REFRESH == type_) {
    bret = (get_tenant_id() == other.get_tenant_id()) && (get_schema_version() == other.get_schema_version());
  }
  return bret;
}

bool ObServerSchemaTask::operator<(const ObServerSchemaTask& other) const
{
  bool bret = (type_ < other.type_);
  if (!bret && ASYNC_REFRESH == type_ && ASYNC_REFRESH == other.type_) {
    if (get_tenant_id() < other.get_tenant_id()) {
      bret = true;
    } else if (get_tenant_id() == other.get_tenant_id() && get_schema_version() < other.get_schema_version()) {
      bret = true;
    } else {
      bret = false;
    }
  }
  return bret;
}

bool ObServerSchemaTask::greator_than(const ObServerSchemaTask& lt, const ObServerSchemaTask& rt)
{
  bool bret = (lt.type_ > rt.type_);
  if (!bret && ASYNC_REFRESH == lt.type_ && ASYNC_REFRESH == rt.type_) {
    if (lt.get_tenant_id() > rt.get_tenant_id()) {
      bret = true;
    } else if (lt.get_tenant_id() == rt.get_tenant_id() && lt.get_schema_version() > rt.get_schema_version()) {
      bret = true;
    } else {
      bret = false;
    }
  }
  return bret;
}

bool ObServerSchemaTask::compare_without_version(const ObServerSchemaTask& other) const
{
  return (*this == other);
}

uint64_t ObServerSchemaTask::get_group_id() const
{
  return static_cast<uint64_t>(type_);
}

bool ObServerSchemaTask::is_barrier() const
{
  return false;
}

int ObServerSchemaUpdater::init(const common::ObAddr& host, ObMultiVersionSchemaService* schema_mgr)
{
  int ret = OB_SUCCESS;
  const int64_t MAX_THREAD_CNT = 1;
  const int64_t MAX_TENANT_CNT = 1024;
  if (NULL == schema_mgr) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("schema_mgr must not null");
  } else if (OB_FAIL(task_queue_.init(this, MAX_THREAD_CNT, MAX_TENANT_CNT + 2, "SerScheQueue"))) {
    LOG_WARN("init task queue failed", KR(ret), LITERAL_K(MAX_THREAD_CNT), K(MAX_TENANT_CNT));
  } else {
    host_ = host;
    schema_mgr_ = schema_mgr;
    inited_ = true;
  }
  return ret;
}

void ObServerSchemaUpdater::stop()
{
  if (!inited_) {
    LOG_WARN("not init");
  } else {
    task_queue_.stop();
  }
}

void ObServerSchemaUpdater::wait()
{
  if (!inited_) {
    LOG_WARN("not init");
  } else {
    task_queue_.wait();
  }
}

void ObServerSchemaUpdater::destroy()
{
  if (inited_) {
    stop();
    wait();
    host_.reset();
    schema_mgr_ = NULL;
    inited_ = false;
  }
}

int ObServerSchemaUpdater::process_barrier(const ObServerSchemaTask& task, bool& stopped)
{
  UNUSEDx(task, stopped);
  return OB_NOT_SUPPORTED;
}

int ObServerSchemaUpdater::batch_process_tasks(const ObIArray<ObServerSchemaTask>& batch_tasks, bool& stopped)
{
  int ret = OB_SUCCESS;
  ObCurTraceId::init(host_);
  ObSEArray<ObServerSchemaTask, UNIQ_TASK_QUEUE_BATCH_EXECUTE_NUM> tasks;
  if (!inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("ob_server_schema_updeter is not inited.", KR(ret));
  } else if (stopped) {
    ret = OB_CANCELED;
    LOG_WARN("ob_server_schema_updeter is stopped.", KR(ret));
  } else if (batch_tasks.count() <= 0) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("batch_tasks cnt is 0", KR(ret));
  } else if (OB_FAIL(tasks.assign(batch_tasks))) {
    LOG_WARN("fail to assign task", KR(ret), "task_cnt", batch_tasks.count());
  } else {
    std::sort(tasks.begin(), tasks.end(), ObServerSchemaTask::greator_than);
    ObServerSchemaTask::TYPE type = tasks.at(0).type_;
    if ((ObServerSchemaTask::REFRESH == type || ObServerSchemaTask::RELEASE == type) && (1 != tasks.count())) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("refresh/release schema task should process alone", KR(ret), "task_cnt", tasks.count());
    } else if (ObServerSchemaTask::REFRESH == type) {
      if (OB_FAIL(process_refresh_task(tasks.at(0)))) {
        LOG_WARN("fail to process refresh task", KR(ret), K(tasks.at(0)));
      }
    } else if (ObServerSchemaTask::RELEASE == type) {
      if (OB_FAIL(process_release_task())) {
        LOG_WARN("fail to process release task", KR(ret), K(tasks.at(0)));
      }
    } else if (ObServerSchemaTask::ASYNC_REFRESH == type) {
      if (OB_FAIL(process_async_refresh_tasks(tasks))) {
        LOG_WARN("fail to process async refresh tasks", KR(ret));
      }
    } else {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("invalid type", KR(ret), K(type));
    }
  }
  ObCurTraceId::reset();
  return ret;
}

int ObServerSchemaUpdater::process_refresh_task(const ObServerSchemaTask& task)
{
  int ret = OB_SUCCESS;
  const ObRefreshSchemaInfo& schema_info = task.schema_info_;
  ObTaskController::get().switch_task(share::ObTaskType::SCHEMA);
  THIS_WORKER.set_timeout_ts(INT64_MAX);
  LOG_INFO("[REFRESH_SCHEMA] start to process schema refresh task", KR(ret), K(schema_info));
  if (OB_ISNULL(schema_mgr_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("schema_mgr_ is NULL", KR(ret));
  } else if (OB_ISNULL(GCTX.root_service_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("rootservice is null", KR(ret));
  } else if (GCTX.root_service_->in_service() && !GCTX.root_service_->is_full_service()) {
    // Rootservice will refresh schema(holding mutex lock) when it restarts,
    // so refresh schema task triggered by heartbeat should be avoided in such a situation.
    ret = OB_EAGAIN;
    LOG_WARN("rootservice is not in full service, try again",
        KR(ret),
        K(GCTX.root_service_->in_service()),
        K(GCTX.root_service_->is_full_service()));
  } else if (!schema_info.is_valid()) {
    // It should refresh schema by old mode while schema split job is not executed yet.
    bool found = false;
    do {
      int64_t refreshed_schema_version = OB_INVALID_VERSION;
      int64_t received_broadcast_version = OB_INVALID_VERSION;
      int64_t begin_time = ::oceanbase::common::ObTimeUtility::current_time();
      LOG_INFO("[REFRESH_SCHEMA] begin refresh schema, ", K(begin_time));
      if (OB_FAIL(schema_mgr_->get_tenant_received_broadcast_version(OB_SYS_TENANT_ID, received_broadcast_version))) {
        LOG_WARN("fail to get received broadcast version", KR(ret));
      } else if (OB_FAIL(schema_mgr_->refresh_and_add_schema(received_broadcast_version))) {
        LOG_WARN("fail to refresh and add schema", KR(ret));
      } else if (OB_FAIL(
                     schema_mgr_->get_tenant_refreshed_schema_version(OB_SYS_TENANT_ID, refreshed_schema_version))) {
        LOG_WARN("fail to get refreshed schema version", KR(ret));
      }
      LOG_INFO("[REFRESH_SCHEMA] end refresh schema with old mode, ",
          KR(ret),
          "used time",
          ::oceanbase::common::ObTimeUtility::current_time() - begin_time);

      if (OB_SUCCESS != ret || (refreshed_schema_version >= received_broadcast_version)) {
        found = true;
      }
    } while (!found && task.did_retry_);
  } else {
    // For compatibility, schema service will start in old mode.
    // Rootservice will broadcast new schema info by heartbeat while cluster already run schema split job.
    // So, observer's refresh schema task should run in different mode according to the schema info from heartbeat.
    if (!GCTX.is_schema_splited()) {
      int64_t sys_schema_version = OB_INVALID_VERSION;
      if (OB_FAIL(schema_mgr_->get_tenant_refreshed_schema_version(OB_SYS_TENANT_ID, sys_schema_version))) {
        LOG_WARN("fail to get sys refreshed schema version", KR(ret));
      } else if (sys_schema_version <= OB_CORE_SCHEMA_VERSION) {
        // While cluster bootstraped with new schema refresh mode, sys tables which depended on old mode maybe empty.
        // To avoid failure of refresh schema in old mode, refresh schema task should run in different mode according to
        // schema info from heartbeat.
        (void)GCTX.set_split_schema_version(schema_info.get_split_schema_version());
      }
    }
    ObRefreshSchemaInfo local_schema_info;
    const uint64_t new_sequence_id = schema_info.get_sequence_id();
    uint64_t last_sequence_id = OB_INVALID_ID;
    if (OB_FAIL(ret)) {
    } else if (OB_FAIL(schema_mgr_->get_last_refreshed_schema_info(local_schema_info))) {
      LOG_WARN("fail to get local schema info", KR(ret));
    } else if (FALSE_IT(last_sequence_id = local_schema_info.get_sequence_id())) {
    } else if (OB_INVALID_ID != last_sequence_id && last_sequence_id >= new_sequence_id) {
      // skip
      LOG_INFO("[REFRESH_SCHEMA] local schema info is newer, no need to refresh schema",
          KR(ret),
          K(local_schema_info),
          K(schema_info));
    } else {
      // empty tenant_ids means refresh all tenants' schema.
      ObSEArray<uint64_t, 1> tenant_ids;
      uint64_t local_sequence_id = local_schema_info.get_sequence_id();
      uint64_t new_sequence_id = schema_info.get_sequence_id();
      // It means observer don't lost heartbeat if sequence_id is consistent, so observer can only refresh specific
      // tenant's schema.
      if (local_schema_info.is_valid() && OB_INVALID_VERSION != local_sequence_id &&
          local_sequence_id + 1 == new_sequence_id) {
        uint64_t refresh_tenant_id = schema_info.get_tenant_id();
        if (OB_FAIL(tenant_ids.push_back(refresh_tenant_id))) {
          LOG_WARN("fail to push back tenant_id", KR(ret), K(refresh_tenant_id));
        } else {
          LOG_INFO("[REFRESH_SCHEMA] refresh schema by tenant",
              KR(ret),
              K(refresh_tenant_id),
              K(local_schema_info),
              K(schema_info));
        }
      }
      int64_t begin_time = ::oceanbase::common::ObTimeUtility::current_time();
      LOG_INFO("[REFRESH_SCHEMA] begin refresh schema, ", K(begin_time));
      bool use_new_mode = true;
      bool check_bootstrap = (OB_INVALID_ID == new_sequence_id);
      if (FAILEDx(schema_mgr_->refresh_and_add_schema(tenant_ids, use_new_mode, check_bootstrap))) {
        LOG_WARN("fail to refresh and add schema", KR(ret));
      } else if (OB_FAIL(schema_mgr_->set_last_refreshed_schema_info(schema_info))) {
        LOG_WARN("fail to set last_refreshed_schema_info", KR(ret));
      } else {
        // When run schema split job in upgrade mode, schema service should switch to new mode
        // after schema service once refreshed schema in new mode successfully.
        (void)GCTX.set_split_schema_version(schema_info.get_split_schema_version());
      }
      LOG_INFO("[REFRESH_SCHEMA] end refresh schema with new mode, ",
          KR(ret),
          K(tenant_ids),
          "used time",
          ObTimeUtility::current_time() - begin_time);
    }
    {
      // Schema service in standby cluster should always load refresh schema status
      // to update snapshot version which is used to weak read sys table in observer.
      // Here, we should ignore error code.
      int tmp_ret = OB_SUCCESS;
      bool restore_tenant_exist = false;
      ObSEArray<uint64_t, 1> tenant_ids;
      if (ObSchemaService::g_liboblog_mode_) {
        // Agentserver/liboblog won't load refresh schema status.
        restore_tenant_exist = false;
      } else if (OB_SUCCESS != (tmp_ret = schema_mgr_->check_restore_tenant_exist(tenant_ids, restore_tenant_exist))) {
        LOG_WARN("fail to check restore tenant exist", KR(ret), K(tmp_ret), K(tenant_ids));
        restore_tenant_exist = true;
      }
      if ((GCTX.is_standby_cluster() || restore_tenant_exist) && REACH_TIME_INTERVAL(10 * 1000 * 1000)) {  // 10s
        int tmp_ret = OB_SUCCESS;
        ObSchemaStatusProxy* schema_status_proxy = GCTX.schema_status_proxy_;
        if (OB_ISNULL(schema_status_proxy)) {
          tmp_ret = OB_ERR_UNEXPECTED;
          LOG_WARN("schema_status_proxy is null", K(tmp_ret));
        } else if (OB_SUCCESS != (tmp_ret = schema_status_proxy->load_refresh_schema_status())) {
          LOG_WARN("fail to load refresh schema status", KR(ret), K(tmp_ret));
        }
      }
    }
  }

  // For performance, schema_guard will be cached in one session instead of each SQL statement constructs its own new
  // schema_guard, which may lead to lack of schema slots since schema guard will hold reference of schema mgr in long
  // time. To avoid -4023 error caused by lack of schema slots while refresh schema, observer should try to release
  // cached schema_guard in different sessions in such situation.
  if (OB_EAGAIN == ret) {
    OBSERVER.get_sql_session_mgr().try_check_session();
  }

  // dump schema statistics info
  if (REACH_TIME_INTERVAL(10 * 60 * 1000 * 1000)) {  // 10min
    if (OB_NOT_NULL(schema_mgr_)) {
      schema_mgr_->dump_schema_statistics();
    }
  }
  return ret;
}

int ObServerSchemaUpdater::process_release_task()
{
  int ret = OB_SUCCESS;
  ObTaskController::get().switch_task(share::ObTaskType::SCHEMA);
  THIS_WORKER.set_timeout_ts(INT64_MAX);
  if (OB_ISNULL(schema_mgr_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("schema_mgr_ is NULL", KR(ret));
  } else if (!GCTX.is_schema_splited() || !schema_mgr_->is_sys_full_schema()) {
    // auto release unused memory of schema after schema split
  } else if (OB_FAIL(schema_mgr_->try_eliminate_schema_mgr())) {
    LOG_WARN("fail to eliminate schema mgr", KR(ret));
  }
  LOG_INFO("try to release schema", KR(ret));
  return ret;
}

int ObServerSchemaUpdater::process_async_refresh_tasks(const ObIArray<ObServerSchemaTask>& tasks)
{
  int ret = OB_SUCCESS;
  ObTaskController::get().switch_task(share::ObTaskType::SCHEMA);
  THIS_WORKER.set_timeout_ts(INT64_MAX);
  if (OB_ISNULL(schema_mgr_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("schema_mgr_ is NULL", KR(ret));
  } else if (!GCTX.is_schema_splited()) {
    ret = OB_SCHEMA_EAGAIN;
    LOG_WARN("wait rs heartbeat to trigger schema refresh", KR(ret));
  } else {
    // For each tenant, we can only execute the async refresh schema task which has maximum schema_version.
    ObSEArray<uint64_t, UNIQ_TASK_QUEUE_BATCH_EXECUTE_NUM> tenant_ids;
    for (int64_t i = 0; OB_SUCC(ret) && i < tasks.count(); i++) {
      const ObServerSchemaTask& cur_task = tasks.at(i);
      if (ObServerSchemaTask::ASYNC_REFRESH != cur_task.type_) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("cur task type should be ASYNC_REFRESH", KR(ret), K(cur_task));
      } else if (i > 0) {
        const ObServerSchemaTask& last_task = tasks.at(i - 1);
        if (last_task.get_tenant_id() < cur_task.get_tenant_id() ||
            (last_task.get_tenant_id() == cur_task.get_tenant_id() &&
                last_task.get_schema_version() < cur_task.get_schema_version())) {
          ret = OB_ERR_UNEXPECTED;
          LOG_WARN("cur task should be less than last task", KR(ret), K(last_task), K(cur_task));
        }
      }
      if (OB_SUCC(ret)) {
        int64_t cnt = tenant_ids.count();
        if (0 == cnt || tenant_ids.at(cnt - 1) != cur_task.get_tenant_id()) {
          const uint64_t tenant_id = cur_task.get_tenant_id();
          int64_t local_version = OB_INVALID_VERSION;
          int tmp_ret = OB_SUCCESS;
          if (i > 0 && tasks.at(i - 1).get_tenant_id() == cur_task.get_tenant_id()) {
            // Tasks have been sorted by (tenant_id, schema_version) in desc order, so we just get first task by tenant.
          } else if (OB_SUCCESS != (tmp_ret = schema_mgr_->get_tenant_refreshed_schema_version(
                                        tenant_id, local_version))) {  // ignore ret
            if (OB_ENTRY_NOT_EXIST != tmp_ret) {
              LOG_WARN("failed to get tenant refreshed schema version", KR(tmp_ret), K(tenant_id));
            }
          } else if (cur_task.get_schema_version() > local_version) {
            if (OB_FAIL(tenant_ids.push_back(cur_task.get_tenant_id()))) {
              LOG_WARN("fail to push back task", KR(ret), K(cur_task));
            }
          }
        }
      }
    }
    if (OB_SUCC(ret) && tenant_ids.count() > 0) {
      if (OB_FAIL(schema_mgr_->refresh_and_add_schema(tenant_ids))) {
        LOG_WARN("fail to refresh schema", KR(ret), K(tenant_ids));
      }
    }
  }
  LOG_INFO("try to async refresh schema", KR(ret));
  return ret;
}

int ObServerSchemaUpdater::try_reload_schema(const int64_t schema_version)
{
  int ret = OB_SUCCESS;
  if (!inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("ob_server_schema_updeter is not inited.", KR(ret));
  } else {
    int64_t local_version = schema_mgr_->get_refreshed_schema_version();
    if (local_version > schema_version) {
      // LOG_WARN("check schema local version gt than new version:"
      //          "local[%ld], new[%ld]", local_version, schema_version);
    } else if (schema_version <= OB_CORE_SCHEMA_VERSION) {
      _OB_LOG(INFO,
          "schema_vsersion[%ld] <= CORE_SCHEMA_VERSION[%ld],"
          " no need to update schema",
          schema_version,
          OB_CORE_SCHEMA_VERSION);
    } else if (local_version < schema_version) {
      schema_mgr_->set_received_broadcast_version(schema_version);
      // @todo did_retry = false for READ_ZONE
      const bool did_retry = true;
      ObServerSchemaTask refresh_task(ObServerSchemaTask::REFRESH, did_retry);
      if (OB_FAIL(task_queue_.add(refresh_task))) {
        if (OB_EAGAIN != ret) {
          _OB_LOG(WARN, "schedule fetch new schema task failed:version[%ld], ret[%d]", schema_version, ret);
        }
      } else {
        LOG_INFO("schedule fetch new schema task", K(schema_version));
      }
    } else if (local_version == schema_version) {
      // always gen new response_notify_schema_ts in old mode
      schema_mgr_->set_response_notify_schema_ts(schema::GEN_NEW_VERSION);
    }
  }
  return ret;
}

int ObServerSchemaUpdater::try_reload_schema(const ObRefreshSchemaInfo& schema_info)
{

  int ret = OB_SUCCESS;
  if (!inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("ob_server_schema_updeter is not inited.", KR(ret));
  } else if (OB_ISNULL(schema_mgr_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("schema_service is null", KR(ret));
  } else {
    // Try to update received_broadcast_version which used to check if local schema is new enough for SQL execution.
    // Here, we ignore error since set_tenant_received_broadcast_version() may fail before tenant firstly refresh
    // schema.
    int tmp_ret = OB_SUCCESS;
    if (OB_INVALID_TENANT_ID != schema_info.get_tenant_id() && schema_info.get_schema_version() > 0 &&
        OB_SUCCESS != (tmp_ret = schema_mgr_->set_tenant_received_broadcast_version(
                           schema_info.get_tenant_id(), schema_info.get_schema_version()))) {
      LOG_WARN("fail to set tenant received broadcast version", K(tmp_ret), K(schema_info));
    }

    const bool did_retry = true;
    ObServerSchemaTask refresh_task(ObServerSchemaTask::REFRESH, did_retry, schema_info);
    if (OB_FAIL(task_queue_.add(refresh_task))) {
      if (OB_EAGAIN != ret) {
        LOG_WARN("schedule fetch new schema task failed", KR(ret), K(schema_info));
      }
    } else {
      LOG_INFO("schedule fetch new schema task", KR(ret), K(schema_info));
    }
  }
  return ret;
}

int ObServerSchemaUpdater::try_release_schema()
{
  int ret = OB_SUCCESS;
  ObServerSchemaTask release_task(ObServerSchemaTask::RELEASE);
  if (!inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("ob_server_schema_updeter is not inited.", KR(ret));
  } else if (OB_FAIL(task_queue_.add(release_task))) {
    if (OB_EAGAIN != ret) {
      LOG_WARN("schedule release schema task failed", KR(ret));
    }
  } else {
    LOG_INFO("schedule release schema task", KR(ret));
  }
  return ret;
}

int ObServerSchemaUpdater::async_refresh_schema(const uint64_t tenant_id, const int64_t schema_version)
{
  int ret = OB_SUCCESS;
  ObServerSchemaTask refresh_task(ObServerSchemaTask::ASYNC_REFRESH, tenant_id, schema_version);
  if (!inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("ob_server_schema_updeter is not inited.", KR(ret));
  } else if (OB_INVALID_TENANT_ID == tenant_id || OB_INVALID_ID == tenant_id ||
             !ObSchemaService::is_formal_version(schema_version)) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid arg", KR(ret), K(tenant_id), K(schema_version));
  } else if (OB_FAIL(task_queue_.add(refresh_task))) {
    if (OB_EAGAIN != ret) {
      LOG_WARN("schedule async refresh schema task failed", KR(ret), K(tenant_id), K(schema_version));
    }
  } else {
    LOG_INFO("schedule async refresh schema task", KR(ret), K(tenant_id), K(schema_version));
  }
  return ret;
}
}  // namespace observer
}  // namespace oceanbase
